package services

import (
	"fmt"
	"github.com/jakecoffman/cron"
	"github.com/zhongshaofa/swan-jobs/internal/models"
	"github.com/zhongshaofa/swan-jobs/internal/repositorys"
	"github.com/zhongshaofa/swan-jobs/internal/rpc/client"
	pc "github.com/zhongshaofa/swan-jobs/internal/rpc/proto/client"
	"sync"
)

// TaskManageInterface 任务管理模型接口
type TaskManageInterface interface {
	Add(task *models.Task)
	Start()
	Stop()
	Delete(task *models.Task)
	Update(task *models.Task)
	Len() int
}

// CronTask 定时任务信息
type CronTask struct {
	Task *models.Task
}

// CronTaskManage 定时任务管理模型
type CronTaskManage struct {
	cron                       *cron.Cron
	list                       map[int]*CronTask
	mu                         sync.Mutex
	applicationClientInterface repositorys.ApplicationClientInterface
}

// NewCronTaskManage 新建调度器的定时任务管理模型
func NewCronTaskManage() *CronTaskManage {
	c := cron.New()
	return &CronTaskManage{
		cron:                       c,
		list:                       make(map[int]*CronTask),
		applicationClientInterface: repositorys.ApplicationClientInterface(repositorys.ApplicationClientRepository{}),
	}
}

func (receiver *CronTaskManage) Add(task *models.Task) {
	receiver.mu.Lock()
	defer receiver.mu.Unlock()

	key := int(task.ID)
	_, exist := receiver.list[key]
	if exist {
		return
	}
	receiver.list[key] = &CronTask{Task: task}

	appIds := []int{int(task.ID)}
	receiver.cron.AddFunc(task.CronFormula, func() {
		gatherList, err := receiver.applicationClientInterface.GetList(appIds)
		if err != nil || len(gatherList) == 0 {
			return
		}
		ScheduleTaskService.Run(task, gatherList)
	}, task.CrontabName())
}

func (receiver *CronTaskManage) Start() {
	receiver.cron.Start()
}

func (receiver *CronTaskManage) Stop() {
	receiver.cron.Stop()
}

func (receiver *CronTaskManage) Delete(task *models.Task) {
	receiver.mu.Lock()
	defer receiver.mu.Unlock()

	key := int(task.ID)
	_, exist := receiver.list[key]
	if !exist {
		return
	}
	delete(receiver.list, key)

	receiver.cron.RemoveJob(task.CrontabName())
}

func (receiver *CronTaskManage) Update(task *models.Task) {
	receiver.Delete(task)
	receiver.Add(task)
}

func (receiver *CronTaskManage) Len() int {
	if receiver.list == nil {
		return 0
	}
	return len(receiver.list)
}

const (
	AllExecStatus    = 1 // 全部成功
	AllNotExecStatus = 2 // 全部失败
	PartOfExecStatus = 3 // 部分成功
)

type SupervisorTask struct {
	Task       *models.Task
	ClientList []*repositorys.ApplicationClientGather
	TaskStatus int
}

type SupervisorTaskManage struct {
	list                       map[int]*SupervisorTask
	status                     int
	mu                         sync.Mutex
	applicationClientInterface repositorys.ApplicationClientInterface
}

func (supervisorTask *SupervisorTask) UpdateTaskStatus() {
	// 循环请求客户端，判断任务状态
	if supervisorTask.ClientList == nil {
		return
	}

	isFail := false
	isSuccess := false

	for _, gatherClient := range supervisorTask.ClientList {
		connectAddress := fmt.Sprintf("%s:%v", gatherClient.Client.Ip, gatherClient.Client.Port)
		request := &pc.CommonTaskRequest{
			TaskId:   int32(supervisorTask.Task.ID),
			Mode:     int32(supervisorTask.Task.Mode),
			ClientId: int32(gatherClient.ClientId),
		}
		scheduler, err := client.ExistTaskByScheduler(connectAddress, request)

		if err != nil {
			isFail = true
			fmt.Printf("检测任务状态失败:%s:%+v\n", connectAddress, err)
			continue
		}

		if !scheduler.Exist {
			isFail = true
			taskRequest := &pc.TaskRequest{
				Directory:  supervisorTask.Task.Directory,
				Command:    supervisorTask.Task.Command,
				Timeout:    int32(supervisorTask.Task.CronTimeOut),
				TaskId:     int32(supervisorTask.Task.ID),
				Mode:       int32(supervisorTask.Task.Mode),
				ScheduleId: int32(supervisorTask.Task.ID),
				ClientId:   int32(supervisorTask.Task.ID),
			}
			go func() {
				_, _ = client.RunTaskByScheduler(connectAddress, taskRequest)
			}()
		} else {
			isSuccess = true
		}
	}

	if isFail == true && isSuccess == false {
		//全部失败
		supervisorTask.TaskStatus = AllNotExecStatus
	} else if isFail == false && isSuccess == true {
		//全部成功
		supervisorTask.TaskStatus = AllExecStatus
	} else {
		//部分成功
		supervisorTask.TaskStatus = PartOfExecStatus
	}
}

func NewSupervisorTaskManage() *SupervisorTaskManage {
	return &SupervisorTaskManage{
		list:                       make(map[int]*SupervisorTask),
		status:                     models.False,
		applicationClientInterface: repositorys.ApplicationClientInterface(repositorys.ApplicationClientRepository{}),
	}
}

func (receiver *SupervisorTaskManage) Add(task *models.Task) {
	receiver.mu.Lock()
	defer receiver.mu.Unlock()

	key := int(task.ID)
	_, exist := receiver.list[key]
	if exist {
		return
	}

	supervisorTask := &SupervisorTask{
		Task:       task,
		ClientList: nil,
		TaskStatus: AllNotExecStatus,
	}

	appIds := []int{int(task.AppId)}
	gatherList, err := receiver.applicationClientInterface.GetList(appIds)
	supervisorTask.ClientList = nil
	receiver.list[key] = supervisorTask

	if receiver.status == models.True && err == nil && len(gatherList) > 0 {
		go ScheduleTaskService.Run(task, gatherList)
	}
}

func (receiver *SupervisorTaskManage) Start() {
	if receiver.status == models.True {
		return
	}
	if receiver.list == nil {
		return
	}
	for index, supervisorTask := range receiver.list {
		appIds := []int{int(supervisorTask.Task.AppId)}
		gatherList, err := receiver.applicationClientInterface.GetList(appIds)
		receiver.list[index].ClientList = gatherList
		if err == nil && len(gatherList) > 0 {
			go ScheduleTaskService.Run(supervisorTask.Task, gatherList)
		}
	}
	receiver.status = models.True
}

func (receiver *SupervisorTaskManage) Stop() {
	// 循环发送任务停止动作
	for _, supervisorTask := range receiver.list {
		go ScheduleTaskService.Delete(supervisorTask.Task, supervisorTask.ClientList)
	}
}

func (receiver *SupervisorTaskManage) Delete(task *models.Task) {
	receiver.mu.Lock()
	defer receiver.mu.Unlock()

	key := int(task.ID)
	_, exist := receiver.list[key]
	if exist {
		return
	}

	supervisorTask := receiver.list[key]
	go ScheduleTaskService.Delete(supervisorTask.Task, supervisorTask.ClientList)
	delete(receiver.list, key)
}

func (receiver *SupervisorTaskManage) Update(task *models.Task) {
	receiver.Delete(task)
	receiver.Add(task)
}

func (receiver *SupervisorTaskManage) Len() int {
	if receiver.list == nil {
		return 0
	}
	return len(receiver.list)
}
